Understanding an Application Programming Iterface (API):
APIs are incredible tools that allow one software to access another software. What exactly does that mean?
Well, for example, imagine you're using a weather app on your smartphone. When you
open the app to check the weather forecast, the app does not generate this data itself. Instead, it sends a
request to a weather service API, and from that request, sends back the latest weather information.
In this example the software on a phone 'gets' data from the weather API.
Similarly, Microsoft Flight Simulator has its own way of interacting with external software through an API.
Microsoft included this capability in a software development kit (SDK), and their API is called SimConnect.
Just like the weather app example, SimConnect allows external applications to 'set' and 'get' data in MSFS,
enabling developers to access flight parameters, control certain variables in the simulator, and create tools that
enhance the simulation experience. This makes SimConnect a powerful interface for anyone looking to extend
the capabilities of MSFS beyond its built-in features—which is what we are going to do here. And you can find
a Python adapted SimConnect version here.
Application Features:
I designed this software to do more than record flight data. For that purpose I decided to write a GUI application that
allows the user to use other utilities I wanted in the Simulator. These include:
Local Flight Record Util:
0. Once started. Recorder will standby on main menu screen until a flight is detected to have started.
1. A new flight is detected.
2. A new flight number directory is created sequentially in the ./data directory using the f number i.e. f1, f2, f3.
3. 4. A new ANG_FLIGHT_NUMBER is assigned corrosponding with the flight number directory in ./data.
4. A flight header .pkl file is created in ./data/f#/f#_Flight_Header.pkl
5. A flight data .pkl file is created in ./data/f#/f#.pkl (This file activily records the current flight data every few seconds)
6. Once flight is detected to have concluded recorder goes on standby in step 1.
Aircraft Shutdown Util:
Start All Aircraft Systems: Starts all Engines, Electronics, and Avionics.
Stop All Aircraft Systems: Stops all engines, Electronics and Avionics.
Aircraft Fast Travel Util:
Given a destination Latitude, longitude, and Altitude will teleport the aircraft instantly.
Repair and Refuel Util:
Fully repairs and refuels the aircraft instantly.
ANG Sim Dashboard Util:
Displays, in an easy to read format, the current aircraft latitude, longitude, altitude, GPS distance to waypoint, and SIM RATE.
Application Architecture:
I wrote four key files to create this application.
ANG_MSFS_2020_Flight_Data_Recorder.py - This is where the GUI lives and breaths.
ANG_Flight_Recorder_v_0_5.py - Supplies the GUI interface with everything it needs to operate.
ANG_flight_data_converter.py - Allows the user to easily convert one or all flights not already converted to csv.
ang_flight_data_reader_utils.py - Supplies the functionality to ANG_flight_data_converter.py
ANG_MSFS_2020_Flight_Data_Recorder.py
# -*- coding: utf-8 -*-
"""
Created on Mon May 6 11:49:55 2024
@author: ANG
"""
# IMPORTS
import time
import sys
import math
import ANG_Flight_Recorder_v_0_5 as angflightrec
from PyQt5.QtWidgets import (
QApplication, QPushButton, QVBoxLayout, QWidget, QLabel,
QListWidget, QStackedWidget, QHBoxLayout, QMessageBox, QLineEdit, QTextEdit
)
from PyQt5.QtGui import QFont, QColor
from PyQt5.QtCore import (QObject, pyqtSignal, QTimer, Qt, QRunnable,
QThreadPool, pyqtSlot)
class WorkerSignals(QObject):
'''
Class defines signals available from a running worker thread. Without this
the WorkerThread cannot send (emit) data to the main application.
Signals:
message_text : string data to send to main thread via emit().
'''
message_text = pyqtSignal(str)
class WorkerThread(QRunnable):
'''
Worker thread class. Inherits from QRunnable to handler worker thread setup,
signals and wrap-up.
'''
def __init__(self, _SM, _AQ, _AE, _TF, *args, **kwargs):
super(WorkerThread, self).__init__()
# Store constructor arguments (re-used for processing)
self.running = True
self._SM = _SM
self._AQ = _AQ
self._AE = _AE
self._TF = _TF
self.args = args
self.kwargs = kwargs
self.signals = WorkerSignals()
self.is_paused = False
self.flight_dictionary = None
self.ang_fnum = None
@pyqtSlot()
def run(self):
'''
Initialise the run function with passed args, kwargs. Iterates about
every 1 seconds.
'''
# DO HEAVY LIFTING HERE
while self.running:
time.sleep(1.0)
# Check if we are in a flight
self.in_flight = self.in_current_flight()
# Get current flight number
# self.current_flight_num = angflightrec.get_flight_number(self._AQ)
# Get last flight number in data directory
self.last_flight_num = angflightrec.get_last_flight_num()
if self.is_paused:
self.message_text = "RECORD PAUSED."
self.signals.message_text.emit(self.message_text)
continue # Skip to the next iteration
if not self.in_flight:
self.message_text = "WAITING FOR FLIGHT..."
self.signals.message_text.emit(self.message_text)
# Reset flight dictionary since flight has ended
self.flight_dictionary = None
# Reset flight number since flight has ended
self.ang_fnum = None
continue # Skip to the next iteration
# At this point, we are in flight
if self.flight_dictionary is None:
# Start a new flight
self.wait_loading(30)
self.start_new_flight()
self.emmit_header()
else:
# Continue recording flight data
updated_dict = angflightrec.active_record(
self.flight_dictionary, self._AQ, self._TF, self.ang_fnum)
self.emmit_header()
def stop(self):
'''
Function signals for the thread to stop.
Returns
-------
None.
'''
self.running = False
return
def wait_loading(self, int_load_time):
'''
Function used to allow time to load.
Returns
-------
None.
'''
for i in range(int_load_time):
time.sleep(1)
t_minus = int_load_time - i
self.message_text = f"Loading: {t_minus}"
self.signals.message_text.emit(self.message_text)
return
def emmit_header(self):
'''
Function emits header to app.
Returns
-------
None.
'''
if self.ang_fnum == None:
print("NO FLIGHT NUMBER")
else:
try:
dir_str = f'./data/{self.ang_fnum}/{self.ang_fnum}_Flight_Header.pkl'
self.header_data = angflightrec.load_data(dir_str)
self.header_str = 'RECORDING:\n--FLIGHT HEADER--\n'
for k,v in self.header_data.items():
self.header_str += str(k) + " : " + str(v) + "\n"
self.message_text = self.header_str + f"\nRecording in Directory {dir_str}"
self.signals.message_text.emit(self.message_text)
except FileNotFoundError as e:
# If the flight directory is absent the flight has not started yet
# or ATC assinged a flight number prematurely
print('EMMIT HEADER WAITING...')
# self.start_new_flight()
return
def check_master_systems_on(self):
'''
Function checks AVIONICS_MASTER_SWITCH and ELECTRICAL_MASTER_BATTERY.
If both on returns True. If both off returns False. Else returns False.
Returns
-------
master_systems_on : Bool
'''
self.avionics_master_check = self._AQ.get("AVIONICS_MASTER_SWITCH")
self.electrical_master_bat_check = self._AQ.get("ELECTRICAL_MASTER_BATTERY")
if self.avionics_master_check == 0.0 and self.electrical_master_bat_check == 0.0:
master_systems_on = False
elif self.avionics_master_check == 1.0 and self.electrical_master_bat_check == 1.0:
master_systems_on = True
else:
master_systems_on = False
return master_systems_on
def in_current_flight(self):
'''
Function checks if currently in flight. The main menu default coordinates
signify that the flight has ended.
Returns
-------
in_current_flight : Bool
If in flight True else False.
'''
curr_pos_lat = self._AQ.get("PLANE_LATITUDE")
curr_pos_lon = self._AQ.get("PLANE_LONGITUDE")
curr_pos_alt = self._AQ.get("PLANE_ALTITUDE")
default_end_pos_lat = round(0.000407442168686809,4)
default_end_pos_lon = round(0.01397450300629543,4)
# default_end_pos_alt = round(3.276148519246465,4)
in_current_flight = None
if (curr_pos_lat is not None and
curr_pos_lon is not None and
curr_pos_alt is not None):
curr_pos_lat = round(curr_pos_lat,4)
curr_pos_lon = round(curr_pos_lon,4)
curr_pos_alt = round(curr_pos_alt,4)
else:
self.in_current_flight()
if (curr_pos_lat is not None and
curr_pos_lon is not None and
curr_pos_alt is not None and
curr_pos_lat == default_end_pos_lat and
curr_pos_lon == default_end_pos_lon and
curr_pos_alt > 50):
in_current_flight = False
else:
in_current_flight = True
self.ang_fnum = angflightrec.get_last_flight_num()
return in_current_flight
def start_new_flight(self):
'''
Starts new flight via creating a new flight directory for header
and flight data--in ./data directory.
Returns
-------
None.
'''
angflightrec.check_set_last_dir()
self.message_text = "Creating Flight Header..."
self.signals.message_text.emit(self.message_text)
self.header_data = angflightrec.make_flight_header(self._AQ, self._TF)
self.message_text = "Creating Flight Dictionary..."
self.signals.message_text.emit(self.message_text)
self.flight_dictionary = angflightrec.get_flight_dictionary()
self.current_flight_num = angflightrec.get_last_flight_num() # CURRENT FLIGHT NUMBER FROM DIR
return
class SimUtilsApp(QWidget):
def __init__(self):
super(SimUtilsApp, self).__init__()
# SET APP FONT
self.setFont(QFont('consolas',11))
# INSTANTIATE THE LIST
self.leftlist = QListWidget()
self.leftlist.setMaximumSize(225,1000)
# ADD ITEMS TO LIST
self.leftlist.insertItem(0, 'Local Flight Record')
self.leftlist.insertItem(1, 'Aircraft Shutdown Util')
self.leftlist.insertItem(2, 'Aircraft Fast Travel')
self.leftlist.insertItem(3, 'Repair and Refuel')
self.leftlist.insertItem(4, 'ANG Sim Dashboard')
self.leftlist.resize(50, 50)
# INSTANTIATE LAYOUT USER INTERFACES AS WIDGETS
self.stack0 = QWidget()
self.stack1 = QWidget()
self.stack2 = QWidget()
self.stack3 = QWidget()
self.stack4 = QWidget()
# CALL THE STACKS
self.stack0UI()
self.stack1UI()
self.stack2UI()
self.stack3UI()
self.stack4UI()
# ADD STACKS TO MAIN STACK
self.Stack = QStackedWidget (self)
self.Stack.addWidget (self.stack0)
self.Stack.addWidget (self.stack1)
self.Stack.addWidget (self.stack2)
self.Stack.addWidget (self.stack3)
self.Stack.addWidget (self.stack4)
self.Stack.setMaximumSize(800,800)
# ADD SUBLAYOUT STACK/STACKS TO MAIN LAYOUT STACK
hbox = QHBoxLayout(self)
hbox.addWidget(self.leftlist)
hbox.addWidget(self.Stack, alignment=Qt.AlignTop)
# STYLES AND CALL UI
self.setLayout(hbox)
self.setMinimumSize(300,200)
self.leftlist.currentRowChanged.connect(self.display)
self.switch = 0 # On/Off proper connect to SimConnect
self.switch_rec = 0
self.setWindowTitle('ANG MSFS 2020 Flight Data Recorder')
self.show()
# Create Threadpool
self.threadpool = QThreadPool()
def connect_to_sim(self):
'''
Function checks that an instance of Microsoft Flight Simulator is running.
If no instance is running closes application with advice.
Returns
-------
None.
'''
try:
# Create SimConnect link
self._SM = angflightrec.connect_sm() # SimConnect()
# Note the default _time is 2000 to be refreshed every 2 seconds
self._AQ = angflightrec.connect_aq(self._SM) # AircraftRequests(self._SM, _time=2000)
self._AE = angflightrec.connect_ae(self._SM) # AircraftEvents(self._SM)
self._TF = angflightrec.connect_tf()
self.switch = 1 # On/Off proper connect to SimConnect
angflightrec.check_test_data_dir()
angflightrec.check_test_csv_data_dirs()
except ConnectionError:
msg = QMessageBox()
msg.setIcon(QMessageBox.Critical)
msg.setWindowTitle("MSFS Not Detected...")
msg.setText("Connection Error. Microsoft Flight Simulator Must Be Running.")
x = msg.exec_()
self.close()
sys.exit(0)
return
connect_to_sim(self)
def stack0UI(self):
# WRITE THE STACK FOR CONNECTING TO THE MySQL DB HERE
# INSTATIATE THE UI 0 LAYOUT(S)
layout = QVBoxLayout()
# INSTANTIATE WIDGETS
text_input_diag = QTextEdit()
start_push_button = QPushButton('MYSQL CONNECT')
start_record_button = QPushButton('START LOCAL RECORD')
pause_record_button = QPushButton("PAUSE LOCAL RECORD")
resume_record_button = QPushButton("RESUME LOCAL RECORD")
self.header_switch = 0
self.text_trigger_switch = 0
self.worker_true = False
# WIDGET STYLES
text_input_diag.setReadOnly(True)
text_input_diag.setStyleSheet("background-color: black; border: 1px solid green;")
text_input_diag.setTextColor(QColor("green"))
start_record_button.setStyleSheet("background-color: #64c851") # GREEN
pause_record_button.setStyleSheet("background-color: #f42525") # RED
resume_record_button.setStyleSheet("background-color: #FFFF00") # YELLOW
# ADD WIDGETS TO LAYOUT
layout.addWidget(text_input_diag)
layout.addWidget(start_push_button)
layout.addWidget(start_record_button)
layout.addWidget(pause_record_button)
layout.addWidget(resume_record_button)
resume_record_button.hide()
pause_record_button.hide()
start_push_button.hide()
# INSTANTIATE METHODS OF THE STACK
def pause_flight_record(self):
'''
Function pauses the flight recorder.
Returns
-------
None.
'''
pause_record_button.hide()
resume_record_button.show()
self.worker.is_paused = True
text_input_diag.setText("RECORD PAUSE")
return
def resume_flight_record(self):
'''
Function resumes flight recorder.
Returns
-------
None.
'''
resume_record_button.hide()
pause_record_button.show()
self.worker.is_paused = False
return
def update_progress(self, str_value):
'''
Function is sent str_value from worker thread to display in app.
Parameters
----------
str_value : String
String value emitted from worker thread.
Returns
-------
None.
'''
text_input_diag.clear()
self.message_from_thread = str_value
if self.message_from_thread.startswith('RECORDING'):
t = self.message_from_thread.split("\n")
for i in t:
text_input_diag.append(i)
else:
text_input_diag.setText(str_value)
return
def start_flight_record(self):
'''
Function starts flight recorder by instantiating an instance of
a worker thread which operates to record the flight.
Returns
-------
None.
'''
start_record_button.hide()
pause_record_button.show()
self.worker = WorkerThread(self._SM, self._AQ, self._AE, self._TF)
self.worker.setAutoDelete(True)
self.worker.signals.message_text.connect(lambda checked: update_progress(self, self.worker.message_text))
self.worker_true = True
self.threadpool.start(self.worker)
return
# CONNECT METHODS TO WIDGETS
start_record_button.clicked.connect(lambda checked: start_flight_record(self))
pause_record_button.clicked.connect(lambda checked: pause_flight_record(self))
resume_record_button.clicked.connect(lambda checked: resume_flight_record(self))
# APPLY THE STACK
self.stack0.setLayout(layout)
def stack1UI(self):
# INSTATIATE THE UI 1 LAYOUT(S)
layout = QVBoxLayout()
# INSTANTIATE WIDGETS
start_push_button = QPushButton('Start All Aircraft Systems')
stop_push_button = QPushButton('Stop All Aircraft Systems')
start_push_button.setStyleSheet("background-color: #64c851")
stop_push_button.setStyleSheet("background-color: #f42525")
# ADD WIDGETS TO LAYOUT
layout.addWidget(QLabel('Buttons start/stop all systems include: \n'\
'ENGINE_AUTO_START\n'\
'TOGGLE_MASTER_BATTERY_ALTERNATOR\n'\
'TOGGLE_AVIONICS_MASTER'))
layout.addWidget(start_push_button)
layout.addWidget(stop_push_button)
# INSTANTIATE METHODS OF THE STACK
def start_all_systems(self):
event_to_trigger = self._AE.find("ENGINE_AUTO_START")
event_to_trigger()
toggle_master_batt_alternator = self._AE.find("TOGGLE_MASTER_BATTERY_ALTERNATOR")
toggle_master_batt_alternator()
# TOGGLE_AVIONICS_MASTER
toggle_avionics_mast = self._AE.find("TOGGLE_AVIONICS_MASTER")
toggle_avionics_mast()
return
def stop_all_systems(self):
event_to_trigger = self._AE.find("ENGINE_AUTO_SHUTDOWN")
event_to_trigger()
toggle_master_batt_alternator = self._AE.find("TOGGLE_MASTER_BATTERY_ALTERNATOR")
toggle_master_batt_alternator()
toggle_avionics_mast = self._AE.find("TOGGLE_AVIONICS_MASTER")
toggle_avionics_mast()
return
# CONNECT METHODS TO WIDGETS
start_push_button.clicked.connect(lambda checked: start_all_systems(self))
stop_push_button.clicked.connect(lambda checked: stop_all_systems(self))
#APPLY THE STACK
self.stack1.setLayout(layout)
def stack2UI(self):
# INSTATIATE THE UI 1 LAYOUT(S)
layout = QVBoxLayout()
# INSTANTIATE WIDGETS
note_str = 'IMPORTANT: When using the fast travel utility:\n'\
'- Unstable utility use at your own risk.\n'\
'- Aircraft must be in flight.\n'\
'- Be mindful of the altitude sometimes it will place you way too close to the ground.\n'\
'- Proper LAT/LON inputs are floats i.e. 43.63100084864455 -79.38926956621822\n'\
'- Be mindful that you do not fast travel youself into a building or mountain.\n'\
'- Be mindful of your destinations wheather conditions. Do not fast travel into a hurricane.\n'\
'- Flight time total will be lost (will not account for time it would have taken to fly).'
text_note_label = QLabel(note_str)
text_input_lat = QLineEdit()
text_input_lon = QLineEdit()
text_input_alt = QLineEdit()
start_push_button = QPushButton('GO')
start_push_button.setStyleSheet("background-color: #FFFF00")
# ADD WIDGETS TO LAYOUT
layout.addWidget(text_note_label)
layout.addWidget(QLabel("LAT:"))
layout.addWidget(text_input_lat)
layout.addWidget(QLabel("LON:"))
layout.addWidget(text_input_lon)
layout.addWidget(QLabel("ALT (FEET ABOVE GROUND):"))
layout.addWidget(text_input_alt)
layout.addWidget(start_push_button)
# INSTANTIATE METHODS OF THE STACK
def go_fast_travel(self):
fast_lat = text_input_lat.text()
fast_lon = text_input_lon.text()
fast_alt = text_input_alt.text()
if fast_lat != '':
self._AQ.set("PLANE_LATITUDE", float(fast_lat))
if fast_lon != '':
self._AQ.set("PLANE_LONGITUDE", float(fast_lon))
if fast_alt != '':
self._AQ.set("PLANE_ALT_ABOVE_GROUND", float(fast_alt))
return
# CONNECT METHODS TO WIDGETS
start_push_button.clicked.connect(lambda checked: go_fast_travel(self))
# APPLY THE STACK
self.stack2.setLayout(layout)
def stack3UI(self):
# INSTATIATE THE UI 0 LAYOUT(S)
layout = QVBoxLayout()
# INSTANTIATE WIDGETS
start_push_button = QPushButton('REPAIR_AND_REFUEL')
monitor_label = QLabel('*** Fully repair and refuel current aircraft')
start_push_button.setStyleSheet("background-color: #64c851") # GREEN
# ADD WIDGETS TO LAYOUT
layout.addWidget(monitor_label)
layout.addWidget(start_push_button)
# INSTANTIATE METHODS OF THE STACK
def do_repair_and_refuel(self):
event_to_trigger = self._AE.find("REPAIR_AND_REFUEL")
event_to_trigger()
return
# CONNECT METHODS TO WIDGETS
start_push_button.clicked.connect(lambda checked: do_repair_and_refuel(self))
# APPLY THE STACK
self.stack3.setLayout(layout)
def stack4UI(self):
# INSTATIATE THE UI 0 LAYOUT(S)
layout = QVBoxLayout()
# INSTANTIATE WIDGETS
cb = QApplication.clipboard()
start_push_button = QPushButton('Start ANG Sim Dashboard')
stop_push_button = QPushButton('Stop ANG Sim Dashboard')
copy_to_clip_button = QPushButton('Copy Lat/long to Clipboard')
start_push_button.setStyleSheet("background-color: #64c851")
stop_push_button.setStyleSheet("background-color: #f42525")
copy_to_clip_button.setStyleSheet("background-color: #ff0000")
lat_label = QLabel('...')
lon_label = QLabel('...')
alt_label = QLabel('...')
dis_to_targ = QLabel('...')
monitor_label = QLabel('...')
my_auto_pilot_button = QPushButton('AUTO HOLD')
# ADD WIDGETS TO LAYOUT
layout.addWidget(start_push_button)
layout.addWidget(stop_push_button)
layout.addWidget(QLabel('PLANE_LATITUDE:'))
layout.addWidget(lat_label)
layout.addWidget(QLabel('PLANE_LONGITUDE:'))
layout.addWidget(lon_label)
layout.addWidget(copy_to_clip_button)
layout.addWidget(QLabel('ALTITUDE (FEET FROM GROUND):'))
layout.addWidget(alt_label)
layout.addWidget(QLabel('GPS_WP_DISTANCE (KILOMETERS):'))
layout.addWidget(dis_to_targ)
# CHANGE STYLES
font_size_ = 40
lat_label.setFont(QFont('Consolas', font_size_))
lat_label.setStyleSheet("background-color: red; border: 1px solid black;")
lon_label.setFont(QFont('Consolas', font_size_))
lon_label.setStyleSheet("background-color: red; border: 1px solid black;")
alt_label.setFont(QFont('Consolas', font_size_))
alt_label.setStyleSheet("background-color: red; border: 1px solid black;")
dis_to_targ.setFont(QFont('Consolas', font_size_))
dis_to_targ.setStyleSheet("background-color: red; border: 1px solid black;")
layout.addWidget(QLabel('SIM RATE:'))
layout.addWidget(monitor_label)
monitor_label.setFont(QFont('Consolas', font_size_))
monitor_label.setStyleSheet("background-color: red; border: 1px solid black;")
timer = QTimer()
stop_push_button.hide()
# INSTANTIATE METHODS OF THE STACK
def radians_to_degrees(rads):
'''
Function to convert Radians to Degrees.
Parameters
----------
rads : Float
Radians.
Returns
-------
degrees : float
Degrees.
'''
try:
degrees = rads * 180/math.pi
except TypeError:
degrees = -99999999999999.99
return degrees
def update_lat():
'''
Function updates Dashboard display latitude.
Returns
-------
None.
'''
lat = self._AQ.get("PLANE_LATITUDE")
lat_label.setText(str(lat))
return
def update_lon():
'''
Function updates Dashboard display longitude.
Returns
-------
None.
'''
lon = self._AQ.get("PLANE_LONGITUDE")
lon_label.setText(str(lon))
return
def update_alt_ground():
'''
Function updates Dashboard display altitude from ground.
Returns
-------
None.
'''
alt = self._AQ.get("PLANE_ALT_ABOVE_GROUND")
alt_label.setText(str(alt))
return
def update_dist_to_targ():
'''
Function updates Dashboard distance to next waypoint.
Returns
-------
None.
'''
try:
dis = self._AQ.get("GPS_WP_DISTANCE")
dis_in_k = dis / 1000
dis_to_targ.setText(str(dis_in_k))
except TypeError:
dis = -99999999999999.99
return
def copy_lat_long_to_clip(self):
'''
Function copies displayed latitude and longitude to clipboard.
Returns
-------
None.
'''
string_to_copy = f'{lat_label.text()}, {lon_label.text()}'
cb.clear(mode=cb.Clipboard )
cb.setText(string_to_copy, mode=cb.Clipboard)
return
def start_sim_rate_mon(self):
'''
Function monitors sim rate and other variables and displays in the
app. Refreshes every 2.5 seconds.
Returns
-------
None.
'''
try:
if self.switch == 1: # On/Off proper connect to SimConnect
timer.start(2500)
# print("MONITOR START")
start_push_button.hide()
stop_push_button.show()
else:
stop_sim_rate_mon(self)
msg = QMessageBox()
msg.setIcon(QMessageBox.Critical)
msg.setWindowTitle("MSFS Not Detected...")
msg.setText("Connection Error. Microsoft Flight Simulator Must Be Running.")
x = msg.exec_()
except OSError:
timer.stop()
monitor_label.setText('...')
msg = QMessageBox()
msg.setIcon(QMessageBox.Critical)
msg.setWindowTitle("MSFS Not Detected...")
msg.setText("Microsoft Flight Simulator Must Be Running.")
x = msg.exec_()
return
def stop_sim_rate_mon(self):
'''
Function will cease dashboard display and communication with Sim
Connect--connected to stop_push_button.
Returns
-------
None.
'''
timer.stop()
print("MONITOR STOP")
start_push_button.show()
stop_push_button.hide()
monitor_label.setText('...')
lat_label.setText('...')
lon_label.setText('...')
alt_label.setText('...')
dis_to_targ.setText('...')
return
def every_second_while_pressed():
'''
Function runs every 2.5 seconds and updates ANG Sim Dashboard--
connected to start_push_button.
Returns
-------
None.
'''
try:
monitor_label.setText(str(self._AQ.get("SIMULATION_RATE")))
update_lat()
update_lon()
update_alt_ground()
update_dist_to_targ()
except OSError:
timer.stop()
monitor_label.setText('...')
msg = QMessageBox()
msg.setIcon(QMessageBox.Critical)
msg.setWindowTitle("MSFS Not Detected...")
msg.setText("Microsoft Flight Simulator Must Be Running.")
x = msg.exec_()
return
# CONNECT METHODS TO WIDGETS
start_push_button.clicked.connect(lambda checked: start_sim_rate_mon(self))
stop_push_button.clicked.connect(lambda checked: stop_sim_rate_mon(self))
copy_to_clip_button.clicked.connect(lambda checked: copy_lat_long_to_clip(self))
timer.timeout.connect(every_second_while_pressed)
# APPLY THE STACK
self.stack4.setLayout(layout)
def display(self,i):
self.Stack.setCurrentIndex(i)
def closeEvent(self, event):
'''
Function ensures the worker thread is closed on close of the application.
Parameters
----------
event : Close Event.
Is applied by app on close.
Returns
-------
None.
'''
print("Window closing...")
# Perform cleanup tasks here
print("Killing Thread...")
if self.worker_true:
self.worker.running = False
def main():
app = QApplication(sys.argv)
app.setStyle('Fusion')
ex = SimUtilsApp()
sys.exit(app.exec_())
if __name__ == '__main__':
main()
ANG_Flight_Recorder_v_0_5.py
The recorder functions were the easiest thing do. Essentially it is a series of getter functions.
# -*- coding: utf-8 -*-
"""
Created on Thu Apr 18 11:52:44 2024
@author: ANG
"""
# IMPORTS
import math
import os
import glob
import pickle
import pytz
import timezonefinder
import shutil
from datetime import datetime
from SimConnect import SimConnect, AircraftEvents, AircraftRequests
def connect_sm():
# Create SimConnect link
_SM = SimConnect()
return _SM
def connect_aq(_SM):
# Note the default _time is 2000 to be refreshed every 2 seconds
_AQ = AircraftRequests(_SM, _time=2000)
return _AQ
def connect_ae(_SM):
_AE = AircraftEvents(_SM)
return _AE
def connect_tf():
_TF = timezonefinder.TimezoneFinder()
return _TF
'''
_SM = connect_sm()
_AQ = connect_aq(_SM)
_AE = connect_ae(_SM)
_TF = connect_tf()
'''
def check_test_data_dir():
print('Check data dir exist...')
os.makedirs('data', exist_ok=True)
return
def check_test_csv_data_dirs():
print('Check data csv dirs exist...')
os.makedirs('data_csv', exist_ok=True)
os.makedirs('data_csv/flight_data', exist_ok=True)
os.makedirs('data_csv/flight_headers', exist_ok=True)
return
def radians_to_degrees(rads):
'''
Function to convert Radians to Degrees.
Parameters
----------
rads : Float
Radians.
Returns
-------
degrees : float
Degrees.
'''
degrees = rads * 180/math.pi
return degrees
def get_last_flight_num():
'''
Function gets next flight num for the ./data/ directory.
Returns
-------
latest_file : String.
Gets last flight number from ./data/.
'''
list_of_files = glob.glob('./data/*') # * means all if need specific format then *.csv
latest_file = None
if len(list_of_files) == 0:
# latest_file = '000000'
latest_file = 'f0'
else:
latest_file = max(list_of_files, key=os.path.getctime).split('\\')[-1]
return latest_file
def check_flight_number(str_flight_num):
list_of_files = glob.glob('./data/*') # * means all if need specific format then *.csv
curr_flight_nums_in_dir = []
for i in list_of_files:
curr_flight_nums_in_dir.append(i.split('\\')[-1])
if str_flight_num in curr_flight_nums_in_dir:
is_in_dir = True
else:
is_in_dir = False
return is_in_dir
def get_local_time_stamp(_AQ, _TF):
'''
Function gets local time stamp depending where the current flights lat and
lon.
Returns
-------
timestamp: String
String timestamp.
'''
long = _AQ.get("PLANE_LONGITUDE")
lat = _AQ.get("PLANE_LATITUDE")
try:
if type(lat) != float:
lat = _AQ.get("PLANE_LONGITUDE")
if type(long) != float:
long = _AQ.get("PLANE_LATITUDE")
timezone_str = _TF.certain_timezone_at(lat=round(lat,10),
lng=round(long,10))
timezone = pytz.timezone(timezone_str)
dt = datetime.utcnow()
timestamp = dt + timezone.utcoffset(dt)
except:
print("Retry time stamp time stamp.")
timestamp = get_local_time_stamp(_AQ,_TF)
return timestamp
def get_start_flight_data(_AQ, _TF, fnum):
'''
Data here is used to build a header data for the flight.
Returns
-------
None.
'''
header_dict = {"LOCAL_TIME":get_local_time_stamp(_AQ, _TF),
"ANG_FLIGHT_NUMBER":fnum,
"ATC_FLIGHT_NUMBER":_AQ.get("ATC_FLIGHT_NUMBER"),
"ATC_TYPE":_AQ.get("ATC_TYPE"),
"ATC_MODEL":_AQ.get("ATC_MODEL"),
"TOTAL_WEIGHT":_AQ.get("TOTAL_WEIGHT"), # In Pounds
"ENGINE_TYPE":_AQ.get("ENGINE_TYPE"),
"NUMBER_OF_ENGINES":_AQ.get("NUMBER_OF_ENGINES"),
"PLANE_LATITUDE":_AQ.get("PLANE_LATITUDE"),
"PLANE_LONGITUDE":_AQ.get("PLANE_LONGITUDE"),
"PLANE_ALTITUDE":_AQ.get("PLANE_ALTITUDE"),
"PLANE_ALT_ABOVE_GROUND":_AQ.get("PLANE_ALT_ABOVE_GROUND"), # Feet from ground
"DESTINATION_LAT":_AQ.get("GPS_WP_NEXT_LAT"),
"DESTINATION_LON":_AQ.get("GPS_WP_NEXT_LON"),
"DESTINATION_ALT":_AQ.get("GPS_WP_NEXT_ALT"),
"FUEL_TOTAL_QUANTITY":_AQ.get("FUEL_TOTAL_QUANTITY") # In Gallons
}
return header_dict
def get_flight_data(flight_dict, _AQ, _TF):
'''
Data here is monitored throughout the flight and stored
in a dictionary.
Returns
-------
None.
'''
flight_dict["LOCAL_TIME"].append(get_local_time_stamp(_AQ, _TF))
flight_dict["AIRSPEED_TRUE"].append(_AQ.get("AIRSPEED_TRUE")) # In Knots
flight_dict["GROUND_VELOCITY"].append(_AQ.get("GROUND_VELOCITY")) # In Knots
flight_dict["PLANE_LATITUDE"].append(_AQ.get("PLANE_LATITUDE")) # In Degrees; North is positive, South negative
flight_dict["PLANE_LONGITUDE"].append(_AQ.get("PLANE_LONGITUDE")) # In Degrees; East is positive, West negative
flight_dict["PLANE_ALTITUDE"].append(_AQ.get("PLANE_ALTITUDE")) # Feet from sea level
flight_dict["PLANE_ALT_ABOVE_GROUND"].append(_AQ.get("PLANE_ALT_ABOVE_GROUND")) # Feet from ground
flight_dict["AMBIENT_WIND_VELOCITY"].append(_AQ.get("AMBIENT_WIND_VELOCITY")) # In knots
flight_dict["AMBIENT_WIND_DIRECTION"].append(_AQ.get("AMBIENT_WIND_DIRECTION")) # In degrees
flight_dict["AMBIENT_WIND_X"].append(_AQ.get("AMBIENT_WIND_X")) # Wind component in East/West direction. Meters per sec.
flight_dict["AMBIENT_WIND_Y"].append(_AQ.get("AMBIENT_WIND_Y")) # Wind component in vertical direction. Meters per sec.
flight_dict["AMBIENT_WIND_Z"].append(_AQ.get("AMBIENT_WIND_Z")) # Wind component in North/South direction. Meters per sec.
flight_dict["AIRCRAFT_WIND_X"].append(_AQ.get("AIRCRAFT_WIND_X")) # Wind component in aircraft lateral axis
flight_dict["AIRCRAFT_WIND_Y"].append(_AQ.get("AIRCRAFT_WIND_Y")) # Wind component in aircraft vertical axis
flight_dict["AIRCRAFT_WIND_Z"].append(_AQ.get("AIRCRAFT_WIND_Z")) # Wind component in aircraft longitudinal axis
flight_dict["AMBIENT_VISIBILITY"].append(_AQ.get("AMBIENT_VISIBILITY")) # In Meters
flight_dict["AMBIENT_TEMPERATURE"].append(_AQ.get("AMBIENT_TEMPERATURE")) # Celsius
flight_dict["BAROMETER_PRESSURE"].append(_AQ.get("BAROMETER_PRESSURE")) # Determines air density, which impacts lift and engine performance
flight_dict["AILERON_LEFT_DEFLECTION"].append(_AQ.get("AILERON_LEFT_DEFLECTION")) # In Radians
flight_dict["AILERON_RIGHT_DEFLECTION"].append(_AQ.get("AILERON_RIGHT_DEFLECTION")) # In Radians
flight_dict["ANGLE_OF_ATTACK_INDICATOR"].append(_AQ.get("ANGLE_OF_ATTACK_INDICATOR")) # In Radians
flight_dict["GPS_WP_TRUE_BEARING"].append(_AQ.get("GPS_WP_TRUE_BEARING")) # In Radians
flight_dict["GPS_WP_DISTANCE"].append(_AQ.get("GPS_WP_DISTANCE")) # In Meters
flight_dict["ELEVATOR_TRIM_POSITION"].append(_AQ.get("ELEVATOR_TRIM_POSITION")) # In Radians
flight_dict["FLAPS_HANDLE_PERCENT"].append(_AQ.get("FLAPS_HANDLE_PERCENT")) # Percent Over 100
flight_dict["HEADING_INDICATOR"].append(_AQ.get("HEADING_INDICATOR")) # In Radians
flight_dict["PLANE_PITCH_DEGREES"].append(_AQ.get("PLANE_PITCH_DEGREES")) # In Radians; mentions degrees in err
flight_dict["PLANE_BANK_DEGREES"].append(_AQ.get("PLANE_BANK_DEGREES")) # In Radians; mentions degrees in err
flight_dict["RUDDER_POSITION"].append(_AQ.get("RUDDER_POSITION")) # Percent rudder input deflection
flight_dict["VERTICAL_SPEED"].append(_AQ.get("VERTICAL_SPEED")) # In feet/minute
flight_dict["G_FORCE"].append(_AQ.get("G_FORCE"))
flight_dict["FUEL_TOTAL_QUANTITY"].append(_AQ.get("FUEL_TOTAL_QUANTITY")) # In Gallons
flight_dict["GENERAL_ENG_THROTTLE_LEVER_POSITION:1"].append(_AQ.get("GENERAL_ENG_THROTTLE_LEVER_POSITION:1")) # Percent of max throttle position
flight_dict["GENERAL_ENG_THROTTLE_LEVER_POSITION:2"].append(_AQ.get("GENERAL_ENG_THROTTLE_LEVER_POSITION:2"))
flight_dict["GENERAL_ENG_THROTTLE_LEVER_POSITION:3"].append(_AQ.get("GENERAL_ENG_THROTTLE_LEVER_POSITION:3"))
flight_dict["GENERAL_ENG_THROTTLE_LEVER_POSITION:4"].append(_AQ.get("GENERAL_ENG_THROTTLE_LEVER_POSITION:4"))
flight_dict["PROP_THRUST:1"].append(_AQ.get("PROP_THRUST:1")) # In Pounds
flight_dict["PROP_THRUST:2"].append(_AQ.get("PROP_THRUST:2"))
flight_dict["PROP_THRUST:3"].append(_AQ.get("PROP_THRUST:3"))
flight_dict["PROP_THRUST:4"].append(_AQ.get("PROP_THRUST:4"))
flight_dict["GENERAL_ENG_EXHAUST_GAS_TEMPERATURE:1"].append(_AQ.get("GENERAL_ENG_EXHAUST_GAS_TEMPERATURE:1")) # In Rankine
flight_dict["GENERAL_ENG_EXHAUST_GAS_TEMPERATURE:2"].append(_AQ.get("GENERAL_ENG_EXHAUST_GAS_TEMPERATURE:2"))
flight_dict["GENERAL_ENG_EXHAUST_GAS_TEMPERATURE:3"].append(_AQ.get("GENERAL_ENG_EXHAUST_GAS_TEMPERATURE:3"))
flight_dict["GENERAL_ENG_EXHAUST_GAS_TEMPERATURE:4"].append(_AQ.get("GENERAL_ENG_EXHAUST_GAS_TEMPERATURE:4"))
flight_dict["GENERAL_ENG_FUEL_PRESSURE:1"].append(_AQ.get("GENERAL_ENG_FUEL_PRESSURE:1")) # In Psi
flight_dict["GENERAL_ENG_FUEL_PRESSURE:2"].append(_AQ.get("GENERAL_ENG_FUEL_PRESSURE:2"))
flight_dict["GENERAL_ENG_FUEL_PRESSURE:3"].append(_AQ.get("GENERAL_ENG_FUEL_PRESSURE:3"))
flight_dict["GENERAL_ENG_FUEL_PRESSURE:4"].append(_AQ.get("GENERAL_ENG_FUEL_PRESSURE:4"))
flight_dict["ENG_FUEL_FLOW_GPH:1"].append(_AQ.get("ENG_FUEL_FLOW_GPH:1")) # In Gallons Per Hour
flight_dict["ENG_FUEL_FLOW_GPH:2"].append(_AQ.get("ENG_FUEL_FLOW_GPH:2"))
flight_dict["ENG_FUEL_FLOW_GPH:3"].append(_AQ.get("ENG_FUEL_FLOW_GPH:3"))
flight_dict["ENG_FUEL_FLOW_GPH:4"].append(_AQ.get("ENG_FUEL_FLOW_GPH:4"))
flight_dict["TURB_ENG_VIBRATION:1"].append(_AQ.get("TURB_ENG_VIBRATION:1")) # Number/Float
flight_dict["TURB_ENG_VIBRATION:2"].append(_AQ.get("TURB_ENG_VIBRATION:2"))
flight_dict["TURB_ENG_VIBRATION:3"].append(_AQ.get("TURB_ENG_VIBRATION:3"))
flight_dict["TURB_ENG_VIBRATION:4"].append(_AQ.get("TURB_ENG_VIBRATION:4"))
flight_dict["GENERAL_ENG_OIL_PRESSURE:1"].append(_AQ.get("GENERAL_ENG_OIL_PRESSURE:1")) # In Psf
flight_dict["GENERAL_ENG_OIL_PRESSURE:2"].append(_AQ.get("GENERAL_ENG_OIL_PRESSURE:2"))
flight_dict["GENERAL_ENG_OIL_PRESSURE:3"].append(_AQ.get("GENERAL_ENG_OIL_PRESSURE:3"))
flight_dict["GENERAL_ENG_OIL_PRESSURE:4"].append(_AQ.get("GENERAL_ENG_OIL_PRESSURE:4"))
flight_dict["GENERAL_ENG_RPM:1"].append(_AQ.get("GENERAL_ENG_RPM:1"))
flight_dict["GENERAL_ENG_RPM:2"].append(_AQ.get("GENERAL_ENG_RPM:2"))
flight_dict["GENERAL_ENG_RPM:3"].append(_AQ.get("GENERAL_ENG_RPM:3"))
flight_dict["GENERAL_ENG_RPM:4"].append(_AQ.get("GENERAL_ENG_RPM:4"))
flight_dict["FUEL_TANK_RIGHT_MAIN_QUANTITY"].append(_AQ.get("FUEL_TANK_RIGHT_MAIN_QUANTITY")) # In Gallons
flight_dict["FUEL_TANK_LEFT_MAIN_QUANTITY"].append(_AQ.get("FUEL_TANK_LEFT_MAIN_QUANTITY")) # In Gallons
flight_dict["FUEL_TOTAL_QUANTITY_WEIGHT"].append(_AQ.get("FUEL_TOTAL_QUANTITY_WEIGHT")) # In Pounds
flight_dict["STALL_WARNING"].append(_AQ.get("STALL_WARNING"))
flight_dict["OVERSPEED_WARNING"].append(_AQ.get("OVERSPEED_WARNING"))
# flight_dict[""].append(_AQ.get(""))
return flight_dict
def get_flight_dictionary():
'''
Function creates a flight data dictionary.
Returns
-------
flight_dict : Dictionary
A flight dictionary containing all key fields of recorded flight data.
'''
flight_dict = {"LOCAL_TIME":[],
"PLANE_LATITUDE":[],
"PLANE_LONGITUDE":[],
"PLANE_ALTITUDE":[],
"PLANE_ALT_ABOVE_GROUND":[],
"AMBIENT_WIND_VELOCITY":[],
"AMBIENT_WIND_DIRECTION":[],
"AMBIENT_WIND_X":[],
"AMBIENT_WIND_Y":[],
"AMBIENT_WIND_Z":[],
"AIRCRAFT_WIND_X":[],
"AIRCRAFT_WIND_Y":[],
"AIRCRAFT_WIND_Z":[],
"AMBIENT_VISIBILITY":[],
"AMBIENT_TEMPERATURE":[],
"BAROMETER_PRESSURE":[],
"AILERON_LEFT_DEFLECTION":[],
"AILERON_RIGHT_DEFLECTION":[],
"ANGLE_OF_ATTACK_INDICATOR":[],
"AIRSPEED_TRUE":[],
"GROUND_VELOCITY":[],
"GPS_WP_TRUE_BEARING":[],
"GPS_WP_DISTANCE":[],
"ELEVATOR_TRIM_POSITION":[],
"FLAPS_HANDLE_PERCENT":[],
"HEADING_INDICATOR":[],
"PLANE_PITCH_DEGREES":[],
"PLANE_BANK_DEGREES":[],
"RUDDER_POSITION":[],
"VERTICAL_SPEED":[],
"G_FORCE":[],
"FUEL_TOTAL_QUANTITY":[],
"GENERAL_ENG_THROTTLE_LEVER_POSITION:1":[],
"GENERAL_ENG_THROTTLE_LEVER_POSITION:2":[],
"GENERAL_ENG_THROTTLE_LEVER_POSITION:3":[],
"GENERAL_ENG_THROTTLE_LEVER_POSITION:4":[],
"PROP_THRUST:1":[],
"PROP_THRUST:2":[],
"PROP_THRUST:3":[],
"PROP_THRUST:4":[],
"GENERAL_ENG_EXHAUST_GAS_TEMPERATURE:1":[],
"GENERAL_ENG_EXHAUST_GAS_TEMPERATURE:2":[],
"GENERAL_ENG_EXHAUST_GAS_TEMPERATURE:3":[],
"GENERAL_ENG_EXHAUST_GAS_TEMPERATURE:4":[],
"GENERAL_ENG_FUEL_PRESSURE:1":[],
"GENERAL_ENG_FUEL_PRESSURE:2":[],
"GENERAL_ENG_FUEL_PRESSURE:3":[],
"GENERAL_ENG_FUEL_PRESSURE:4":[],
"ENG_FUEL_FLOW_GPH:1":[],
"ENG_FUEL_FLOW_GPH:2":[],
"ENG_FUEL_FLOW_GPH:3":[],
"ENG_FUEL_FLOW_GPH:4":[],
"TURB_ENG_VIBRATION:1":[],
"TURB_ENG_VIBRATION:2":[],
"TURB_ENG_VIBRATION:3":[],
"TURB_ENG_VIBRATION:4":[],
"GENERAL_ENG_OIL_PRESSURE:1":[],
"GENERAL_ENG_OIL_PRESSURE:2":[],
"GENERAL_ENG_OIL_PRESSURE:3":[],
"GENERAL_ENG_OIL_PRESSURE:4":[],
"GENERAL_ENG_RPM:1":[],
"GENERAL_ENG_RPM:2":[],
"GENERAL_ENG_RPM:3":[],
"GENERAL_ENG_RPM:4":[],
"FUEL_TANK_RIGHT_MAIN_QUANTITY":[],
"FUEL_TANK_LEFT_MAIN_QUANTITY":[],
"FUEL_TOTAL_QUANTITY_WEIGHT":[],
"STALL_WARNING":[],
"OVERSPEED_WARNING":[],
# "":[],
}
return flight_dict
def update_flight_dict(flight_dict, _AQ, _TF):
'''
Function updates the flight data dict.
Parameters
----------
flight_dict : Dictionary.
Flight data dictionary.
Returns
-------
updated_dict : Dictionary.
Updated Flight data dictionary.
'''
updated_dict = get_flight_data(flight_dict, _AQ, _TF)
return updated_dict
def save_data(SomeData, str_dir, str_file_name):
'''
Function saves given data to given directory and filename.
Parameters
----------
SomeData : *
Data to be pickled.
str_dir : String
String directory.
str_file_name : String
String file name w/o .pkl.
Returns
-------
None.
'''
with open(f'./data/{str_dir}/{str_file_name}.pkl', 'wb') as fp:
pickle.dump(SomeData, fp)
return
def load_data(some_pickle_file_path_str):
'''
Function loads pickled data given a string filepath.
Parameters
----------
some_pickle_file_path_str : String
String path to pickle file.
Returns
-------
the_data : *
The given pickled data.
'''
with open(some_pickle_file_path_str, 'rb') as fp:
the_data = pickle.load(fp)
return the_data
def get_atc_flight_number(_AQ):
'''
Function retrieves the flight number of an active flight. If there is no
active flight method iterates until active flight number is assinged.
Parameters
----------
_AQ : SimConnect Aircraft Requests object.
Used to connect to MSFS.
Returns
-------
flight_num_str : String
ATC Flight number.
'''
flight_num_str = None
while flight_num_str == None:
flight_num_str = _AQ.get("ATC_FLIGHT_NUMBER")
if type(flight_num_str) == type(None):
continue
else:
flight_num_str = flight_num_str.decode()
return flight_num_str
def get_flight_num():
'''
Function sets next flight number.
Returns
-------
None.
'''
flight_num = get_last_flight_num()
# flight_num = flight_num + 1
# _AQ.set("ATC_FLIGHT_NUMBER", str.encode(flight_num))
return flight_num
def clear_flight_num(_AQ):
'''
Function sets next flight number.
Returns
-------
None.
'''
_AQ.set("ATC_FLIGHT_NUMBER", str.encode('STANDBY_FOR_FLIGHT_NUMBER'))
return
def make_flight_data_dir(_AQ):
'''
Makes a new directory to record flight data with a custom flight num.
Returns
-------
None.
'''
flight_number = int(get_last_flight_num()[1:])+1
flight_number_f = f'f{flight_number}'
os.mkdir(f'./data/{flight_number_f}')
return
def make_flight_header(_AQ, _TF):
'''
Function sets the flight number, creates directory for the flight, and saves
the flight header.
Parameters
----------
_AQ : Sim connect Aircraft Requests Object.
_TF : Timezone Finder Object.
Returns
-------
start_flight_data : TYPE
DESCRIPTION.
'''
# Set the flight num and make dir
print("Making flight data directory...")
make_flight_data_dir(_AQ)
print("Setting flight number...")
flight_num = get_last_flight_num() # GETS LAST FLIGHT NUMBER IN DATA DIRECTORY IN ABOVE LINE
start_flight_data = get_start_flight_data(_AQ, _TF, flight_num)
save_data(start_flight_data, flight_num, f'{flight_num}_Flight_Header') # SAVES THE HEADER .pkl
return start_flight_data
def check_set_last_dir():
last_flight_dir = get_last_flight_num()
file_path = f"./data/{last_flight_dir}/{last_flight_dir}.pkl"
if os.path.exists(file_path):
print("File exists")
else:
print(f'Flight data {file_path} does not exist. Removing...')
shutil.rmtree(f"./data/{last_flight_dir}", ignore_errors=True)
return
def active_record(flight_dictionary, _AQ, _TF, flight_num):
'''
Function gets an active flight number, if there is an active flight, iterates
checking flight number is still active, updates the flight data in
flight_dictionary, saves data to directory flight number.
Parameters
----------
flight_dictionary : Dictionary
Flight data dictionary appends every iteration.
flight_num : String
Flight number string.
Returns
-------
updated_dict : Dictionary
Flight data dictionary with appended data per iteration.
'''
flight_num = get_last_flight_num()
flight_dictionary = update_flight_dict(flight_dictionary, _AQ, _TF)
save_data(flight_dictionary, flight_num, flight_num)
return flight_dictionary
ANG_flight_data_converter.py
# -*- coding: utf-8 -*-
"""
Created on Thu Oct 10 16:59:50 2024
@author: ANG
"""
import ang_data_reader_utils as angdru
import time
def main():
help_str = "\nWelcome to ANG Flight Data Converter!\n"\
"0. Show all Flights not converted to .csv\n"\
"1. Convert flight to .csv\n"\
"2. Convert flight header to .csv\n"\
"3. Convert all flights to .csv\n"\
"4. Convert all headers to .csv\n"\
"5. Convert all flights and all headers not converted to .csv\n"\
"6. Exit\n\n"
print(help_str)
while True:
user_input = input("Enter your choice: ")
if user_input == "0":
angdru.show_all_flights_and_headers_not_converted()
elif user_input == "1":
user_in_flight_num = input("Enter flight number to convert to .csv i.e. f1: ")
angdru.convert_single_flight_to_csv(user_in_flight_num)
elif user_input == "2":
user_in_flight_num = input("Enter flight header number to convert to .csv i.e. f1: ")
angdru.convert_single_header_to_csv(user_in_flight_num)
elif user_input == "3":
angdru.show_all_flights_and_headers_not_converted()
user_cont_0 = input('The flights from the above table will be converted to .csv. Continue [y n]:')
if user_cont_0 == 'n':
print('Not converting... continue...')
continue
elif user_cont_0 == 'y':
angdru.export_all_flights_to_csv()
elif user_input == "4":
angdru.show_all_flights_and_headers_not_converted()
user_cont_0 = input('The flight headers from the above table will be converted to .csv. Continue [y n]:')
if user_cont_0 == 'n':
continue
elif user_cont_0 == 'y':
angdru.export_all_headers_to_csv()
else:
continue
elif user_input == "5":
angdru.show_all_flights_and_headers_not_converted()
user_cont_0 = input('The flights and headers from the table above will be converted to .csv. Continue [y n]:')
if user_cont_0 == 'n':
print('Not converting... continue...')
continue
elif user_cont_0 == 'y':
angdru.export_all_headers_to_csv()
angdru.export_all_flights_to_csv()
else:
continue
elif user_input == "6":
print("Exiting the application. Goodbye!")
time.sleep(2)
break
else:
# Handle invalid input
print(help_str)
print("Please enter a valid input [0 1 2 3 4 5 6]")
if __name__ == "__main__":
main()
ang_data_reader_utils.py
# -*- coding: utf-8 -*-
"""
Created on Wed Sep 25 15:21:28 2024
@author: ANG
"""
import os
import pickle
from pandas import DataFrame
def test_check_data_dirs():
os.makedirs('data_csv', exist_ok=True)
os.makedirs('data_csv/flight_data', exist_ok=True)
os.makedirs('data_csv/flight_headers', exist_ok=True)
return
def load_data(some_pickle_file_path_str):
'''
Function loads pickled data given a string filepath.
Parameters
----------
some_pickle_file_path_str : String
String path to pickle file.
Returns
-------
the_data : *
The given pickled data.
'''
with open(some_pickle_file_path_str, 'rb') as fp:
the_data = pickle.load(fp)
return the_data
def load_header(flight_num):
data = load_data(f"./data/{flight_num}/{flight_num}_Flight_Header.pkl")
return data
def load_flight_data(flight_num):
data = load_data(f"./data/{flight_num}/{flight_num}.pkl")
return data
def data_to_dataframe(data_dictionary):
try:
df = DataFrame(data_dictionary)
except ValueError:
df = DataFrame(data_dictionary, index=[0]).T
return df
def show_all_flights_and_headers_pkl():
# Header for the table
print(f'{"Flights in ./data:":<30} {"Headers in ./data:"}')
for root, dirs, files in os.walk('./data'): # iterates through all flight directories in ./data
if len(files) < 2:
continue
else:
print(f'{files[0]:<30} {files[1]}')
return
def get_csv_flight_nums():
'''
Function returns a list of string flight numbers that currently exist in
./data_csv/flight_data
Returns
-------
current_csv_flight_nums : List
List of string flight nums i.e. ['f1','f2','f3',...].
'''
current_csv_flight_nums = []
for i in os.walk('./data_csv/flight_data'):
for i2 in i[2]:
current_csv_flight_nums.append(i2.split('.')[0])
return current_csv_flight_nums
def get_csv_flight_headers():
'''
Function returns a list of string flight headers that currently exist in
./data_csv/flight_data
Returns
-------
current_csv_flight_nums : List
List of string flight nums i.e. ['f1','f2','f3',...].
'''
current_csv_flight_headers = []
for i in os.walk('./data_csv/flight_headers'):
for i2 in i[2]:
current_csv_flight_headers.append(i2.split('_')[0])
return current_csv_flight_headers
def get_all_headers_pkl():
'''
Gets all recorded headers in ./data
Returns
-------
headers_lst : List
List of string flight header nums i.e. ['f1','f2','f3',...].
'''
headers_lst = []
for root, dirs, files in os.walk('./data'): # iterates through all flight directories in ./data
if len(files) == 2:
headers_lst.append(files[1].split('_')[0])
return headers_lst
def get_all_flight_pkl():
'''
Gets all recorded flights in ./data
Returns
-------
flights_lst : List
List of string flight nums i.e. ['f1','f2','f3',...].
'''
flights_lst = []
for root, dirs, files in os.walk('./data'): # iterates through all flight directories in ./data
if len(files) == 2:
flights_lst.append(files[0].split('.')[0])
return flights_lst
def check_convert_headers_to_csv():
'''
Function checks for all header numbers recorded in ./data that are not
yet converted to .csv in ./data_csv/flight_headers
Returns
-------
diff_lst : List
List of string flight nums i.e. ['f1','f2','f3',...].
'''
current_csv_flight_headers_lst = get_csv_flight_headers()
current_pkl_headers_lst = get_all_headers_pkl()
diff_lst = [i for i in current_pkl_headers_lst if i not in current_csv_flight_headers_lst]
return diff_lst
def check_convert_flights_to_csv():
'''
Fucntion checks for all flight data numbers recorded in ./data that are not
yet converted to .csv in ./data_csv/flight_data
Returns
-------
diff_lst : List
List of string flight nums i.e. ['f1','f2','f3',...].
'''
current_csv_flights_lst = get_csv_flight_nums()
current_pkl_flights_lst = get_all_flight_pkl()
diff_lst = [i for i in current_pkl_flights_lst if i not in current_csv_flights_lst]
return diff_lst
def show_all_flights_and_headers_not_converted():
print('\n\n---------------------------------')
print('All flights and headers in ./data not converted to .csv')
print(f'{"Flights:":<30} {"Headers:"}')
flights_not_converted = check_convert_flights_to_csv()
headers_not_converted = check_convert_headers_to_csv()
if len(flights_not_converted) == 0:
flights_not_converted = ['None to convert']
if len(headers_not_converted) == 0:
headers_not_converted = ['None to convert']
max_len = max(len(flights_not_converted), len(headers_not_converted))
flights_not_converted += [''] * (max_len - len(flights_not_converted))
headers_not_converted += [''] * (max_len - len(headers_not_converted))
for flight, header in zip(flights_not_converted, headers_not_converted):
print(f'{flight:<30} {header}')
print('---------------------------------')
return
def convert_single_flight_to_csv(flight_num_str):
if flight_num_str in check_convert_flights_to_csv():
print(f'Converting flight {flight_num_str} to csv...')
data_to_dataframe(load_flight_data(flight_num_str)).to_csv(f'./data_csv/flight_data/{flight_num_str}.csv', index=False)
else:
print(f"Flight {flight_num_str} already converted or does not exist. ")
return
def convert_single_header_to_csv(flight_num_str):
if flight_num_str in check_convert_headers_to_csv():
print(f'Converting flight header {flight_num_str} to csv...')
data_to_dataframe(load_header(flight_num_str)).to_csv(f'./data_csv/flight_headers/{flight_num_str}_Flight_Header.csv')
else:
print(f"Flight header {flight_num_str} already converted or does not exist. ")
return
def export_all_flights_to_csv():
'''
Function converts and exports all flight data from .pkl in ./data
to .csv in data_csv directory if it does not
already exist in ./flight_data
Returns
-------
None.
'''
# All flight nums in ./data_csv
test_check_data_dirs()
check_convert_flights_lst = check_convert_flights_to_csv()
for i in check_convert_flights_lst:
convert_single_flight_to_csv(i)
return
def export_all_headers_to_csv():
'''
Function converts and exports all flight header data from .pkl in ./data
to .csv in data_csv directory if it does not
already exist in ./flight_headers
Returns
-------
None.
'''
test_check_data_dirs()
check_convert_headers_lst = check_convert_headers_to_csv()
for i in check_convert_headers_lst:
convert_single_header_to_csv(i)
return